//package com.example.springboottest.example.kafka.simple.flow;
//
//import org.apache.kafka.common.serialization.Serdes;
//import org.apache.kafka.streams.KafkaStreams;
//import org.apache.kafka.streams.StreamsBuilder;
//import org.apache.kafka.streams.StreamsConfig;
//import org.apache.kafka.streams.kstream.KStream;
//import org.apache.kafka.streams.kstream.KTable;
//import org.apache.kafka.streams.kstream.Produced;
//
//import java.util.Arrays;
//import java.util.Locale;
//import java.util.Properties;
//import java.util.concurrent.CountDownLatch;
//
///**
// * @author fn
// */
//public class WordCountDemo {
//    public static void main(String[] args) {
//        Properties props = new Properties();
//        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
//        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
//        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
//        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
//        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
//
//        StreamsBuilder builder = new StreamsBuilder();
//        KStream<String, String> source = builder.stream("streams-plaintext-input");
//        KTable<String, Long> counts = source.flatMapValues(
//                value -> Arrays.asList(
//                        value.toLowerCase(Locale.getDefault())
//                                .split(""))
//        ).groupBy((key, value) -> value).count();
//        counts.toStream().to("streams-wordcount-output",
//                Produced.with(Serdes.String(), Serdes.Long()));
//
//        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
//        final CountDownLatch latch = new CountDownLatch(1);
//
//        Runtime.getRuntime().addShutdownHook(
//                new Thread("streams-wordcount-shutdown-hook") {
//                    @Override
//                    public void run() {
//                        streams.close();
//                        latch.countDown();
//                    }
//                });
//
//        try {
//            streams.start();
//            latch.await();
//        } catch (
//                Throwable e) {
//            System.exit(1);
//
//        }
//        System.exit(0);
//    }
//}